Skip to main content

Producer-Consumer Problem & Race Conditions

Table of Contents

  1. Understanding the Producer-Consumer Problem
  2. Race Conditions Explained
  3. Solution Evolution
  4. Real-World Examples
  5. Advanced Patterns

Understanding the Problem

What is the Producer-Consumer Problem?

The Producer-Consumer problem is a classic synchronization problem where:

  • Producers generate data and put it into a buffer
  • Consumers take data from the buffer and process it
  • Both operate concurrently
  • Buffer has limited capacity

Key Challenges

  1. Race Conditions - Multiple threads accessing shared buffer
  2. Buffer Overflow - Producer tries to add when buffer is full
  3. Buffer Underflow - Consumer tries to remove when buffer is empty
  4. Coordination - Producers and consumers must wait for each other

Race Conditions Explained

What is a Race Condition?

A race condition occurs when:

  • Multiple threads access shared data concurrently
  • At least one thread modifies the data
  • The outcome depends on the timing of thread execution
  • Results are unpredictable and incorrect

Example: Simple Race Condition

class Counter {
private int count = 0;

public void increment() {
count++; // NOT atomic! Three operations:
// 1. Read count
// 2. Add 1
// 3. Write back
}

public int getCount() {
return count;
}
}

// Race condition in action
public class RaceConditionDemo {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();

// Create 1000 threads, each incrementing 1000 times
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads[i].start();
}

// Wait for all threads
for (Thread thread : threads) {
thread.join();
}

// Expected: 1,000,000
// Actual: Usually less (e.g., 987,432)
System.out.println("Count: " + counter.getCount());
}
}

Why Race Conditions Happen

Thread 1              Thread 2
-------- --------
Read count (0)
Read count (0)
Add 1 (result: 1)
Add 1 (result: 1)
Write 1
Write 1

Result: Both threads read 0, both write 1. One increment is lost!


Solution Evolution

Version 1: Unsafe Producer-Consumer ❌

import java.util.LinkedList;
import java.util.Queue;

class UnsafeBuffer {
private Queue<Integer> buffer = new LinkedList<>();
private int capacity = 10;

// RACE CONDITION: Multiple producers can add simultaneously
public void produce(Integer item) {
if (buffer.size() < capacity) {
buffer.add(item);
System.out.println("Produced: " + item);
} else {
System.out.println("Buffer full!");
}
}

// RACE CONDITION: Multiple consumers can remove simultaneously
public Integer consume() {
if (!buffer.isEmpty()) {
Integer item = buffer.poll();
System.out.println("Consumed: " + item);
return item;
} else {
System.out.println("Buffer empty!");
return null;
}
}
}

// Usage (UNSAFE)
class Producer implements Runnable {
private UnsafeBuffer buffer;
private int id;

public Producer(UnsafeBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
buffer.produce(id * 100 + i);
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
}
}

class Consumer implements Runnable {
private UnsafeBuffer buffer;

public Consumer(UnsafeBuffer buffer) {
this.buffer = buffer;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
buffer.consume();
try { Thread.sleep(150); } catch (InterruptedException e) {}
}
}
}

Problems:

  1. Check-then-act race condition in both methods
  2. No coordination between producers and consumers
  3. Busy waiting when buffer is full/empty
  4. Data corruption possible

Version 2: Synchronized with wait/notify ✅

import java.util.LinkedList;
import java.util.Queue;

class SynchronizedBuffer {
private Queue<Integer> buffer = new LinkedList<>();
private int capacity;

public SynchronizedBuffer(int capacity) {
this.capacity = capacity;
}

public synchronized void produce(Integer item) throws InterruptedException {
// Wait while buffer is full
while (buffer.size() == capacity) {
System.out.println("Buffer full, producer waiting...");
wait(); // Releases lock and waits
}

buffer.add(item);
System.out.println("Produced: " + item + " | Buffer size: " + buffer.size());

// Notify waiting consumers
notifyAll();
}

public synchronized Integer consume() throws InterruptedException {
// Wait while buffer is empty
while (buffer.isEmpty()) {
System.out.println("Buffer empty, consumer waiting...");
wait(); // Releases lock and waits
}

Integer item = buffer.poll();
System.out.println("Consumed: " + item + " | Buffer size: " + buffer.size());

// Notify waiting producers
notifyAll();

return item;
}

public synchronized int size() {
return buffer.size();
}
}

// Usage
class Producer implements Runnable {
private SynchronizedBuffer buffer;
private int id;

public Producer(SynchronizedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}

@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
buffer.produce(id * 100 + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class Consumer implements Runnable {
private SynchronizedBuffer buffer;
private int id;

public Consumer(SynchronizedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}

@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
Integer item = buffer.consume();
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

// Test
public class ProducerConsumerTest {
public static void main(String[] args) {
SynchronizedBuffer buffer = new SynchronizedBuffer(5);

// Create multiple producers and consumers
Thread p1 = new Thread(new Producer(buffer, 1), "Producer-1");
Thread p2 = new Thread(new Producer(buffer, 2), "Producer-2");
Thread c1 = new Thread(new Consumer(buffer, 1), "Consumer-1");
Thread c2 = new Thread(new Consumer(buffer, 2), "Consumer-2");

p1.start();
p2.start();
c1.start();
c2.start();
}
}

Key Points:

  • synchronized ensures mutual exclusion
  • wait() releases lock and suspends thread
  • notifyAll() wakes up all waiting threads
  • Always use while loop, not if (spurious wakeups)

Important: Why while not if?

// WRONG - can cause issues with multiple threads
if (buffer.isEmpty()) {
wait();
}

// CORRECT - recheck condition after waking up
while (buffer.isEmpty()) {
wait();
}

Version 3: ReentrantLock with Conditions ✅

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class LockBasedBuffer {
private Queue<Integer> buffer = new LinkedList<>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

public LockBasedBuffer(int capacity) {
this.capacity = capacity;
}

public void produce(Integer item) throws InterruptedException {
lock.lock();
try {
// Wait while buffer is full
while (buffer.size() == capacity) {
System.out.println(Thread.currentThread().getName() +
" - Buffer full, waiting...");
notFull.await(); // Wait on condition
}

buffer.add(item);
System.out.println(Thread.currentThread().getName() +
" - Produced: " + item + " | Size: " + buffer.size());

notEmpty.signal(); // Signal consumers
} finally {
lock.unlock();
}
}

public Integer consume() throws InterruptedException {
lock.lock();
try {
// Wait while buffer is empty
while (buffer.isEmpty()) {
System.out.println(Thread.currentThread().getName() +
" - Buffer empty, waiting...");
notEmpty.await(); // Wait on condition
}

Integer item = buffer.poll();
System.out.println(Thread.currentThread().getName() +
" - Consumed: " + item + " | Size: " + buffer.size());

notFull.signal(); // Signal producers

return item;
} finally {
lock.unlock();
}
}
}

Advantages over synchronized:

  • Separate conditions for producers and consumers
  • More explicit and readable
  • Can use signal() instead of signalAll() for efficiency
  • Fair lock option available
  • Interruptible lock acquisition

Version 4: BlockingQueue (Best Practice) ⭐

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private int id;

public Producer(BlockingQueue<Integer> queue, int id) {
this.queue = queue;
this.id = id;
}

@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
int item = id * 100 + i;
queue.put(item); // Blocks if queue is full
System.out.println(Thread.currentThread().getName() +
" produced: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
private int id;

public Consumer(BlockingQueue<Integer> queue, int id) {
this.queue = queue;
this.id = id;
}

@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
Integer item = queue.take(); // Blocks if queue is empty
System.out.println(Thread.currentThread().getName() +
" consumed: " + item);
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public class BlockingQueueExample {
public static void main(String[] args) {
// Bounded blocking queue with capacity 5
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// Create multiple producers and consumers
Thread p1 = new Thread(new Producer(queue, 1), "Producer-1");
Thread p2 = new Thread(new Producer(queue, 2), "Producer-2");
Thread c1 = new Thread(new Consumer(queue, 1), "Consumer-1");
Thread c2 = new Thread(new Consumer(queue, 2), "Consumer-2");
Thread c3 = new Thread(new Consumer(queue, 3), "Consumer-3");

p1.start();
p2.start();
c1.start();
c2.start();
c3.start();
}
}

Why BlockingQueue is Best:

  1. Thread-safe by design
  2. No manual synchronization needed
  3. Clean, readable code
  4. Multiple implementations available
  5. Built-in timeout support

BlockingQueue Implementations:

// Fixed capacity, array-backed
BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<>(100);

// Unbounded, linked nodes
BlockingQueue<Integer> queue2 = new LinkedBlockingQueue<>();

// Priority queue
BlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>();

// No storage, direct handoff
BlockingQueue<Integer> queue4 = new SynchronousQueue<>();

// Delayed elements
BlockingQueue<Delayed> queue5 = new DelayQueue<>();

Version 5: With Timeout and Graceful Shutdown ⭐⭐

import java.util.concurrent.*;

class ImprovedProducer implements Runnable {
private BlockingQueue<Integer> queue;
private int id;
private volatile boolean running = true;

public ImprovedProducer(BlockingQueue<Integer> queue, int id) {
this.queue = queue;
this.id = id;
}

public void shutdown() {
running = false;
}

@Override
public void run() {
try {
int count = 0;
while (running) {
int item = id * 1000 + count++;

// Try to add with timeout
boolean added = queue.offer(item, 1, TimeUnit.SECONDS);

if (added) {
System.out.println(Thread.currentThread().getName() +
" produced: " + item);
} else {
System.out.println(Thread.currentThread().getName() +
" timeout, queue full");
}

Thread.sleep(100);
}

System.out.println(Thread.currentThread().getName() + " shutting down");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class ImprovedConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private int id;
private volatile boolean running = true;

public ImprovedConsumer(BlockingQueue<Integer> queue, int id) {
this.queue = queue;
this.id = id;
}

public void shutdown() {
running = false;
}

@Override
public void run() {
try {
while (running) {
// Try to take with timeout
Integer item = queue.poll(1, TimeUnit.SECONDS);

if (item != null) {
System.out.println(Thread.currentThread().getName() +
" consumed: " + item);
// Process item
Thread.sleep(150);
} else {
System.out.println(Thread.currentThread().getName() +
" timeout, queue empty");
}
}

System.out.println(Thread.currentThread().getName() + " shutting down");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public class GracefulShutdownExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

ImprovedProducer producer1 = new ImprovedProducer(queue, 1);
ImprovedProducer producer2 = new ImprovedProducer(queue, 2);
ImprovedConsumer consumer1 = new ImprovedConsumer(queue, 1);
ImprovedConsumer consumer2 = new ImprovedConsumer(queue, 2);

Thread p1 = new Thread(producer1, "Producer-1");
Thread p2 = new Thread(producer2, "Producer-2");
Thread c1 = new Thread(consumer1, "Consumer-1");
Thread c2 = new Thread(consumer2, "Consumer-2");

p1.start();
p2.start();
c1.start();
c2.start();

// Run for 5 seconds
Thread.sleep(5000);

// Graceful shutdown
System.out.println("\n=== Initiating shutdown ===\n");
producer1.shutdown();
producer2.shutdown();
consumer1.shutdown();
consumer2.shutdown();

// Wait for threads to finish
p1.join();
p2.join();
c1.join();
c2.join();

System.out.println("\n=== All threads terminated ===");
}
}

Real-World Examples

Example 1: Log Processing System

import java.util.concurrent.*;

class LogEntry {
private String message;
private long timestamp;

public LogEntry(String message) {
this.message = message;
this.timestamp = System.currentTimeMillis();
}

public String getMessage() { return message; }
public long getTimestamp() { return timestamp; }

@Override
public String toString() {
return "[" + timestamp + "] " + message;
}
}

class LogProducer implements Runnable {
private BlockingQueue<LogEntry> queue;
private String source;

public LogProducer(BlockingQueue<LogEntry> queue, String source) {
this.queue = queue;
this.source = source;
}

@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
LogEntry entry = new LogEntry(source + " - Event " + i);
queue.put(entry);
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 300));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class LogProcessor implements Runnable {
private BlockingQueue<LogEntry> queue;
private String name;

public LogProcessor(BlockingQueue<LogEntry> queue, String name) {
this.queue = queue;
this.name = name;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
LogEntry entry = queue.poll(1, TimeUnit.SECONDS);
if (entry != null) {
processLog(entry);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processLog(LogEntry entry) throws InterruptedException {
System.out.println(name + " processing: " + entry);
// Simulate processing
Thread.sleep(200);
}
}

public class LogProcessingSystem {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<LogEntry> queue = new LinkedBlockingQueue<>(50);

// Multiple log sources (producers)
ExecutorService producers = Executors.newFixedThreadPool(3);
producers.submit(new LogProducer(queue, "WebServer"));
producers.submit(new LogProducer(queue, "Database"));
producers.submit(new LogProducer(queue, "Cache"));

// Multiple log processors (consumers)
ExecutorService consumers = Executors.newFixedThreadPool(2);
consumers.submit(new LogProcessor(queue, "Processor-1"));
consumers.submit(new LogProcessor(queue, "Processor-2"));

// Shutdown after work is done
producers.shutdown();
producers.awaitTermination(1, TimeUnit.MINUTES);

Thread.sleep(5000); // Let consumers finish
consumers.shutdownNow();
}
}

Example 2: Image Processing Pipeline

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

class Image {
private String filename;
private byte[] data;

public Image(String filename, byte[] data) {
this.filename = filename;
this.data = data;
}

public String getFilename() { return filename; }
public byte[] getData() { return data; }
}

class ImageLoader implements Runnable {
private BlockingQueue<Image> queue;
private List<String> files;

public ImageLoader(BlockingQueue<Image> queue, List<String> files) {
this.queue = queue;
this.files = files;
}

@Override
public void run() {
try {
for (String file : files) {
// Simulate loading image
System.out.println("Loading: " + file);
Thread.sleep(100);
byte[] data = new byte[1024]; // Simulated image data
queue.put(new Image(file, data));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class ImageProcessor implements Runnable {
private BlockingQueue<Image> inputQueue;
private BlockingQueue<Image> outputQueue;

public ImageProcessor(BlockingQueue<Image> input, BlockingQueue<Image> output) {
this.inputQueue = input;
this.outputQueue = output;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Image img = inputQueue.poll(1, TimeUnit.SECONDS);
if (img != null) {
// Process image
System.out.println("Processing: " + img.getFilename());
Thread.sleep(200);
outputQueue.put(img);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class ImageSaver implements Runnable {
private BlockingQueue<Image> queue;

public ImageSaver(BlockingQueue<Image> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Image img = queue.poll(1, TimeUnit.SECONDS);
if (img != null) {
// Save image
System.out.println("Saving: " + img.getFilename());
Thread.sleep(150);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public class ImageProcessingPipeline {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Image> loadQueue = new ArrayBlockingQueue<>(10);
BlockingQueue<Image> saveQueue = new ArrayBlockingQueue<>(10);

// Create file list
List<String> files = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
files.add("image" + i + ".jpg");
}

// Create pipeline
Thread loader = new Thread(new ImageLoader(loadQueue, files));
Thread processor1 = new Thread(new ImageProcessor(loadQueue, saveQueue));
Thread processor2 = new Thread(new ImageProcessor(loadQueue, saveQueue));
Thread saver = new Thread(new ImageSaver(saveQueue));

loader.start();
processor1.start();
processor2.start();
saver.start();

loader.join();
Thread.sleep(10000); // Wait for processing
processor1.interrupt();
processor2.interrupt();
saver.interrupt();
}
}

Advanced Patterns

Pattern 1: Poison Pill for Shutdown

class Task {
private String data;
public static final Task POISON_PILL = new Task(null);

public Task(String data) {
this.data = data;
}

public boolean isPoisonPill() {
return this == POISON_PILL;
}

public String getData() {
return data;
}
}

class PoisonPillConsumer implements Runnable {
private BlockingQueue<Task> queue;

public PoisonPillConsumer(BlockingQueue<Task> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
Task task = queue.take();

if (task.isPoisonPill()) {
System.out.println("Received poison pill, shutting down");
break;
}

// Process task
System.out.println("Processing: " + task.getData());
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public class PoisonPillExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(10);

Thread consumer = new Thread(new PoisonPillConsumer(queue));
consumer.start();

// Producer adds tasks
for (int i = 0; i < 5; i++) {
queue.put(new Task("Task-" + i));
}

// Send poison pill to signal shutdown
queue.put(Task.POISON_PILL);

consumer.join();
System.out.println("Consumer terminated gracefully");
}
}

Pattern 2: Multiple Queues (Priority Processing)

class PriorityTask implements Comparable<PriorityTask> {
private String data;
private int priority;

public PriorityTask(String data, int priority) {
this.data = data;
this.priority = priority;
}

@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority); // Higher priority first
}

public String getData() { return data; }
public int getPriority() { return priority; }
}

public class PriorityProcessingExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();

// Consumer
Thread consumer = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
PriorityTask task = queue.poll(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println("Processing (Priority " +
task.getPriority() + "): " +
task.getData());
Thread.sleep(500);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
consumer.start();

// Producer adds tasks with different priorities
queue.put(new PriorityTask("Low priority task", 1));
queue.put(new PriorityTask("High priority task", 10));
queue.put(new PriorityTask("Medium priority task", 5));
queue.put(new PriorityTask("Critical task", 20));

Thread.sleep(5000);
consumer.interrupt();
}
}

Key Takeaways

Race Condition Prevention

  1. ✅ Use proper synchronization (synchronized, locks)
  2. ✅ Make operations atomic
  3. ✅ Use thread-safe data structures
  4. ✅ Minimize shared mutable state
  5. ✅ Always use while loops with wait conditions

Producer-Consumer Best Practices

  1. Use BlockingQueue - Simple, safe, efficient
  2. Bounded buffers - Prevent memory issues
  3. Graceful shutdown - Use poison pills or volatile flags
  4. Handle interrupts - Properly terminate threads
  5. Use thread pools - Better resource management

Solution Comparison

ApproachComplexityPerformanceRecommended
UnsafeLowHigh❌ Never
synchronized + wait/notifyMediumMedium✅ Learning
ReentrantLock + ConditionsHighMedium✅ Advanced control
BlockingQueueLowHigh⭐ Production
Atomic variablesLowVery High⭐ Simple counters

When to Use What

  • Simple producer-consumer: BlockingQueue
  • Complex coordination: ReentrantLock with multiple conditions
  • Legacy code: synchronized with wait/notify
  • Priority processing: PriorityBlockingQueue
  • Direct handoff: SynchronousQueue
  • Delayed tasks: DelayQueue

Remember: The best solution is the simplest one that meets your requirements!